Prototype pipeline: Parse tradb, extract features and train/compare classifiers...
%matplotlib notebook
%load_ext autoreload
%autoreload 2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from _plot import distplot, boxenplot, pairplot, corrplot, plot_learning_curve
from _compute import compute_features, load_hits
from _preprocess import crop_tra
import _features
# MSG
config = {
"basedir": "/mnt/c/AE-Data/msg",
"classes": ["class_1", "class_2", "class_3"]
}
# Lidy
config = {
"basedir": "/mnt/c/AE-Data/Daten_TT",
"classes": ["Brause_6884", "Faserreibung_6865", "Sand_6943", "Sand-Reibung_7001"]
}
Explore the dataset based on the pridb files and standard hit features
df_hits = load_hits(config["basedir"], config["classes"])
plt.figure(figsize=(5, 2), tight_layout=True)
sns.countplot(y="class", data=df_hits)
boxenplot(df_hits, "class", cols=4, logscale=False)
def extract(tra):
samples = 8192
tra_crop = crop_tra(tra, samples, offset=None)
return {
**_features.standard(tra_crop),
**_features.hit(tra_crop),
**_features.spectral(
tra_crop, f_min=10e3, f_max=500e3, n_bands=128, log_freq=False, n_mels=64, n_mels_max=None
),
}
Try to imitate VisualClass feature extractor...
#def extract(tra):
# return _features.visual_class(tra, samples=2048, f_min=80, f_max=550e3)
%%time
df = compute_features(config["basedir"], config["classes"], extract, multiprocess=True, processes=7, chunksize=10)
print(f"Samples: {df.shape[0]}")
print(f"Features: {df.shape[1] - 1}")
distplot(df, "class")
boxenplot(df, "class")
# pairplot(df.sample(100), "class")
# pairplot(df, "class")
corrplot(df)
Rough estimate, considering only the traces of the within/between scatter matrices
def fisher_ratios(dataframe: pd.DataFrame, class_column: str):
var_between = ((dataframe.groupby(class_column).mean() - dataframe.mean()) ** 2).sum()
var_within = dataframe.groupby(class_column).var().sum()
return var_between / var_within
fr = fisher_ratios(df, "class")
fr.sort_values(ascending=False).head(10)
plt.figure(tight_layout=True, figsize=(8, 0.15 * len(fr)))
fr[::-1].plot.barh()
plt.figure(tight_layout=True, figsize=(9.6, 3))
fr.plot(marker=".", linestyle="None")
ToDo: Write sklearn feature selection processor
# threshold = 0.1
# df_filtered = df[[*fr[fr > threshold].index, "class"]]
dfX = df.drop(columns="class")
dfy = df["class"].cat.codes
X = dfX.to_numpy()
y = dfy.to_numpy()
# train/test split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import SelectKBest, RFE
from sklearn.decomposition import PCA, FastICA
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC, LinearSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier, GradientBoostingClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.pipeline import make_pipeline
from sklearn.metrics import classification_report
# feature scaling
scaler = MinMaxScaler()
# feature selection
n_features = 100
selector = None
#selector = SelectKBest(k=n_features)
#selector = RFE(RandomForestClassifier(n_jobs=-1), n_features_to_select=n_features, step=10)
#selector = PCA(n_components=n_features)
#selector = FastICA(n_components=n_features)
# classifier
#clf = GaussianNB()
clf = LinearSVC(dual=False) # dual=False if n_samples > n_features
#clf = SVC(kernel="linear", probability=True)
#clf = SVC(kernel="poly", probability=True)
#clf = GradientBoostingClassifier()
#clf = RandomForestClassifier(n_estimators=10, n_jobs=-1)
#clf = ExtraTreesClassifier(n_estimators=100, n_jobs=-1)
#clf = LinearDiscriminantAnalysis()
#clf = QuadraticDiscriminantAnalysis()
# build pipeline
pipeline = make_pipeline(
*[step for step in (scaler, selector, clf) if step is not None] # ignore None steps
)
print(pipeline)
# train
%time pipeline.fit(X_train, y_train)
# scores
scores = classification_report(
y_test,
pipeline.predict(X_test),
target_names=df["class"].cat.categories
)
print(scores)
from sklearn.metrics import plot_confusion_matrix
class_names = df["class"].cat.categories
plot_confusion_matrix(pipeline, X_test, y_test, display_labels=class_names, cmap=plt.cm.Blues, normalize="true")
plt.tight_layout()
%%time
from sklearn.model_selection import StratifiedShuffleSplit, cross_validate
cv = StratifiedShuffleSplit(n_splits=5, test_size=0.25)
scores = cross_validate(pipeline, X, y, cv=cv, n_jobs=5, scoring=[
"accuracy",
"precision_macro",
"recall_macro",
#"roc_auc_ovo",
#"roc_auc_ovr",
])
[f"CV {key}: {arr.mean():.2f} +/- {arr.std():0.2f}" for key, arr in scores.items()]
plot_learning_curve(pipeline, X, y, cv=cv, scoring="accuracy")
Get relevance of features from classifier or feature selection processor
def feature_importances(classifier, names):
steps = [classifier]
# check if classifier is a pipeline
if hasattr(classifier, "named_steps"): # it's a pipeline
steps = [step for name, step in pipeline.named_steps.items()]
for step in steps:
if hasattr(step, 'scores_'): # SelectKBest
return pd.Series(step.scores_, index=names)
if hasattr(step, 'ranking_'): # RFE
return pd.Series(step.ranking_.max() - step.ranking_, index=names) # inverse rank -> score
if hasattr(step, "feature_importances_"): # RandomForest
return pd.Series(step.feature_importances_, index=names)
if hasattr(step, "coef_"): # SVM
return pd.Series(abs(step.coef_).sum(axis=0), index=names)
feature_names = df.drop(columns="class").columns
feature_scores = feature_importances(pipeline, feature_names)
plt.figure(tight_layout=True, figsize=(8, 0.2 * len(feature_scores)))
pd.Series(feature_scores, index=feature_names).sort_values().plot(kind='barh')
plt.xlabel('Score (higher is better)')
plt.ylabel('Feature');
First, the estimator is trained on the full set of features and the importance of each feature is obtained. The least important feature(s) are pruned from current set of features. That procedure is recursively repeated on the pruned set. Based on the cross-validation scores for every iteration, the optimal number and selection of features can be determined.
from sklearn.preprocessing import minmax_scale
from sklearn.feature_selection import RFECV
from sklearn.ensemble import RandomForestClassifier
rfecv_step = 10
rfecv = RFECV(
RandomForestClassifier(n_estimators=10, n_jobs=-1),
step=rfecv_step, cv=5, n_jobs=-1,
)
%time rfecv.fit(minmax_scale(X), y)
print(f"Feature number of highest CV-score: {rfecv.n_features_}")
plt.figure(figsize=(9.8, 3), tight_layout=True)
n_features = np.clip(
X.shape[1] - np.arange(len(rfecv.grid_scores_)) * rfecv_step, 1, None,
)[::-1]
plt.plot(n_features, rfecv.grid_scores_, "--o")
plt.xlabel("Number of selected features")
plt.ylabel("CV-score")
# X_selected = rfecv.transform(X)
# X_selected.shape
dfp = df.drop(columns="class")
dfp["predict"] = (pipeline.predict(X) == y)
fisher_ratios(dfp, "predict").sort_values(ascending=False).head(20)
boxenplot(dfp, "predict", logscale=True)
distplot(dfp, "predict")